#!/usr/bin/ruby

# A command and control bot to manage a number of puppet daemons
# via the puppet agent for mcollective.  This daemon will ensure that
# puppet daemons run at a set interval and it will control the concurrency
# across the cluster as a whole.
#
# The object is to make best use of the puppetmaster resources and to provide 
# a nice predictable utilization graphs of a master to assist in capacity planning.
#
# We also give priority to the Systems Administrator, if she runs an interactive
# puppetd --test the background ones will back off - assuming her run triggers
# the concurrency threshold.
#
# The concepts here was first introduced on a blog post below:
#
#    http://www.devco.net/archives/2010/03/17/scheduling_puppet_with_mcollective.php
#
# This code is released under the Apache 2 licence.
#
# See http://mcollective-plugins.googlecode.com/ for more information.


require 'mcollective'
require 'yaml'
require 'pp'
require 'logger'

include MCollective::RPC

# Precedence order of configs are:
# - defaults set here
# - settings sotred in the config file
# - settings passed on the command line
#
@config = { :interval => 30,
            :concurrency => false,
            :logfile => "/dev/null",
            :filter => "",
            :randomize => false,
            :daemonize => false}

if File.exist?("/etc/puppetcommander.cfg")
    begin
        config = YAML::load_file('/etc/puppetcommander.cfg')
        if config.keys == @config.keys
            @config = YAML::load_file('/etc/puppetcommander.cfg')
        else
            raise Exception
        end
    rescue Exception => e
        puts "Failed to load config file /etc/puppetcommander.cfg"
        exit 1
    end
end

@options = rpcoptions do |parser, options|
    parser.define_head "Command and Control agent to schedule puppet runs in a cluster"

    parser.on('--interval [MINUTES]', '-i', 'Interval to run clients at') do |v|
        @config[:interval] = v.to_i
    end

    parser.on('--max-concurrent [ACTIVE]', '-m [ACTIVE]', 'Maximum run concurrency to allow') do |v|
        @config[:concurrency] = v.to_i
    end

    parser.on('--logfile [LOGFILE]', '-l [LOGFILE]', 'Log file to write') do |v|
        @config[:logfile] = v
    end

    parser.on("--daemonize", "-d", "Daemonizes the script") do |v|
        @config[:daemonize] = true
    end
end

# Only process the config filters if we didn't receive any on the command line
# which would be in options[:filter]
#
# The final filter that should be active is stored in @options
if @options[:filter]["fact"] == [] && @options[:filter]["agent"] == []
    @config[:filter].split(" ").each do |f|
        if f =~ /^(.+?)=(.+)/
            @options[:filter]["fact"] << {:fact => $1, :value => $2}
        else
            @options[:filter]["cf_class"] << filter
        end

    end

end

@logger = Logger.new(@config[:logfile])

def log(msg)
    @logger.add(Logger::INFO) { msg }
rescue Exception => e
    puts "Could not log '#{msg}': #{e}"
end

def debug(msg)
    if @options[:verbose]
        @logger.add(Logger::DEBUG) { msg }
    end

rescue Exception => e
    puts "Could not log '#{msg}': #{e}"
end

# Does a SimpleRPC call and calculates the total amount of 
# puppet daemons that are currently running a catalog
def concurrent_count
    debug("Getting puppet status")

    @puppet.status.inject(0) {|s,v| s += v[:data][:running] }
end

# This sends a request to a specific client only.
#
# We bypass the SimpleRPC internals here and just send a 
# request without caring for the results
#
# From 0.4.4 we should be able to at least get the msg from the 
# SimpleRPC libraries using the new_request method
def run_client(client)
    log("Running agent for #{client}")

    msg = {:agent  => "puppetd",
           :action => "runonce",
           :caller => "uid=#{Process.uid}",
           :data   => {:forcerun => true}}           

    filter = @puppet.filter.clone
    filter["identity"] = client

    @puppet.client.sendreq(msg, msg[:agent], filter)
end

# This is the beef of things, we take a desired interval and 
# maximum allowed concurrency and basically performs the following
# pseudo actions:
#
# - count the clients that match the supplied filter
# - figure out a desired sleep interval to run the number of clients 
#   in the supplied maximum interval
# - optionally shuffle the list of nodes based on the :randomize config option
# - traverse the list of discovered clients alphabetically and does a 
#   run of each one as long as the concurrency is below the limit.
# - after running the node we sleep for the remaining time of the sleep interval
# - once we've run all nodes, we rediscover and run again - this will pick
#   up new nodes and recover gracefully from network outages and such.
#
# TODO: Add exception handling.
def run(interval, concurrency)
    @puppet.reset

    clients = @puppet.discover :verbose => false

    unless clients == nil
        begin
            sleeptime = interval * 60 / clients.size

            log("Found #{clients.size} puppet nodes, sleeping for ~#{sleeptime} seconds between runs")

            if @config[:randomize]
                clients = clients.sort_by { rand }
            else
                clients.sort!
            end

            clients.each do |client|
                starttime = Time.now.to_i
        
                cur_concurrency = concurrent_count
                log("Current puppetd's running: #{cur_concurrency}")

                if concurrency
                    if cur_concurrency < concurrency
                        run_client(client)
                    else
                        log("Puppet run for client #{client} skipped due to current concurrency of #{cur_concurrency}")
                    end
                else
                    run_client(client)
                end

                sleeptime = (interval * 60 / clients.size) - (Time.now.to_i - starttime)
                log("Sleeping for #{sleeptime} seconds")

                sleep sleeptime - (Time.now.to_i - starttime)
            end
        rescue Exception => e
            log(e)
        end
    else
        log("No Puppet clients found.")
    end
end

@puppet = rpcclient("puppetd", :options => @options)
@puppet.progress = false

# starts the worker loop in the foreground
def foreground
    loop do
      run(@config[:interval], @config[:concurrency])
    end
end

# starts the worker loop in the background
def background
    pid = fork do
        File.open("/var/run/puppetcommander.pid", "w") do |pidfile| 
            pidfile.puts $$ 
        end
        
        Signal.trap("HUP", "IGNORE")

        loop do
            run(@config[:interval], @config[:concurrency])
        end
    end        
    
    Process.detach(pid)
end

log("Looping clients with an interval of #{@config[:interval]} minutes")
log("Restricting to #{@config[:concurrency]} concurrent puppet runs") if @config[:concurrency]

if @config[:daemonize]
    background
else
    foreground
end   
